Worker Pool 防止饿死#51
Conversation
owent
left a comment
There was a problem hiding this comment.
AICR problem for src/atframe/modules/worker_pool_module.cpp:282
AI Code Review
AI Code Review Summaryworker_pool_module 新增每秒保底休眠机制审查Target: Worker Pool 防止饿死 审查范围审查了 发现问题
分析说明变更在工作线程中添加了 核心关注点是对两个独立原子变量的复合操作(先分别 load 求和判断、再分别 store 重置)在逻辑上不是原子操作。当前所有访问均在工作线程内顺序完成,无实际的跨线程读取场景,因此暂无实际竞态导致的数据损坏风险。但代码使用了 建议如需支持跨线程读取统计值,建议:
Problems (1)
Code reference:
|
There was a problem hiding this comment.
Pull request overview
This PR adds worker-pool throttling intended to prevent starvation by tracking per-worker busy/wait time and inserting a preserved sleep window once accumulated tick time reaches one second.
Changes:
- Adds per-worker counters for current-second busy and waited microseconds.
- Adds a worker-set preserve interval defaulting to 8000µs.
- Inserts an extra condition-variable wait after each accumulated second of worker activity.
Comments suppressed due to low confidence (1)
src/atframe/modules/worker_pool_module.cpp:481
- This added line exceeds the repository's
.clang-formatColumnLimit: 120; please wrap it or run clang-format to keep formatting consistent.
auto wait_preserve_us = self->get_owner().configure_tick_preserve_microseconds_in_second.load(std::memory_order_acquire);
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| auto wait_preserve_us = self->get_owner().configure_tick_preserve_microseconds_in_second.load(std::memory_order_acquire); | ||
| if (wait_preserve_us <= 0) { | ||
| wait_preserve_us = 8000; | ||
| } | ||
|
|
||
| std::unique_lock<std::mutex> lk_cv(self->waker_lock_); | ||
| self->status_.store(static_cast<uint8_t>(worker_status::kSleeping), std::memory_order_release); | ||
| self->waker_cv_.wait_for(lk_cv, std::chrono::microseconds(wait_preserve_us)); | ||
| self->status_.store(static_cast<uint8_t>(worker_status::kRunning), std::memory_order_release); | ||
|
|
||
| self->cpu_time_sleep_us_.fetch_add(wait_preserve_us, std::memory_order_release); |
owent
left a comment
There was a problem hiding this comment.
AICR problem for src/atframe/modules/worker_pool_module.cpp:491
owent
left a comment
There was a problem hiding this comment.
AICR problem for src/atframe/modules/worker_pool_module.cpp:463
owent
left a comment
There was a problem hiding this comment.
MEDIUM
·
correctness
cpu_time_sleep_us_ 会重复计算睡眠时间。当常规睡眠(line 467)和保护性睡眠(line 493)在同一迭代中执行时,busy_end_time 始终是第一次工作结束的时间点。Line 474 已将第一次睡眠时长加到 cpu_time_sleep_us_,而 line 500 计算的 sleep_rep = sleep_end_time - busy_end_time 包含了第一次睡眠时长和保护性睡眠时长,导致第一次睡眠被重复计算。
触发场景:Worker 在一次迭代中,tick 工作时间短于 tick_interval(触发常规睡眠),且累计的 busy+waited 时间达到 1 秒(触发保护性睡眠)。
建议修复:在保护性睡眠前捕获当前时间(如 preserve_sleep_start = std::chrono::system_clock::now()),在 line 500 使用 sleep_end_time - preserve_sleep_start 计算实际保护性睡眠时长。
Location: src/atframe/modules/worker_pool_module.cpp:500
owent
left a comment
There was a problem hiding this comment.
LOW
·
correctness
保护性睡眠的时间未记录到 current_tick_second_waited_us_。Lines 482-483 将每秒计数器重置为 0 后,保护性睡眠的时长只被添加到 cpu_time_sleep_us_(line 500),但 current_tick_second_waited_us_ 未被更新,导致新的一秒内的等待统计缺失这段睡眠时间。
Location: src/atframe/modules/worker_pool_module.cpp:501
owent
left a comment
There was a problem hiding this comment.
AICR problem for src/atframe/modules/worker_pool_module.cpp:500
| if (sleep_end_time > preserve_sleep_start) { | ||
| auto sleep_rep = | ||
| std::chrono::duration_cast<std::chrono::microseconds>(sleep_end_time - preserve_sleep_start).count(); | ||
| self->cpu_time_sleep_us_.fetch_add(sleep_rep, std::memory_order_release); |
There was a problem hiding this comment.
MEDIUM
·
correctness
preserve sleep 时间被添加到 cpu_time_sleep_us_ 但未添加到 current_tick_second_waited_us_。这与普通 sleep 路径(第 473-474 行)的行为不一致,后者同时更新两个计数器。缺失的更新会导致 per-second 时间追踪不准确,并可能影响 preserve 触发周期的规律性。
触发场景:当 worker 的 busy + waited 时间累计超过 1 秒后触发 preserve sleep 时,该 sleep 时长不会被计入下一周期的 waited 累计值。
建议修复:在第 500 行的 fetch_add 之后添加 self->current_tick_second_waited_us_.fetch_add(sleep_rep, std::memory_order_release);
Location: src/atframe/modules/worker_pool_module.cpp:500
Referenced code (src/atframe/modules/worker_pool_module.cpp:500):
auto sleep_rep =
std::chrono::duration_cast<std::chrono::microseconds>(sleep_end_time - preserve_sleep_start).count();
self->cpu_time_sleep_us_.fetch_add(sleep_rep, std::memory_order_release);
}
}
owent
left a comment
There was a problem hiding this comment.
AICR problem for src/atframe/modules/worker_pool_module.cpp:478
| } | ||
| } | ||
|
|
||
| if (self->current_tick_second_busy_us_.load(std::memory_order_acquire) + |
There was a problem hiding this comment.
MEDIUM
·
concurrency
对 current_tick_second_busy_us_ 和 current_tick_second_waited_us_ 的检查与重置不是原子操作。这两个变量都是 std::atomic,表明可能被其他线程读取(如监控统计)。在多线程读取场景下,检查两个变量之和与重置之间存在竞态:其他线程可能在两个 store(0) 之间读取到不一致的状态(如 busy=0 但 waited 仍为旧值)。建议使用单一的原子变量或互斥锁保护这对计数器的检查-重置操作。
Location: src/atframe/modules/worker_pool_module.cpp:478-482
Referenced code (src/atframe/modules/worker_pool_module.cpp:478-482):
}
if (self->current_tick_second_busy_us_.load(std::memory_order_acquire) +
self->current_tick_second_waited_us_.load(std::memory_order_acquire) >=
1000000) {
self->current_tick_second_busy_us_.store(0, std::memory_order_release);
self->current_tick_second_waited_us_.store(0, std::memory_order_release);
auto wait_preserve_us =
owent
left a comment
There was a problem hiding this comment.
AICR problem for src/atframe/modules/worker_pool_module.cpp:481
| if (self->current_tick_second_busy_us_.load(std::memory_order_acquire) + | ||
| self->current_tick_second_waited_us_.load(std::memory_order_acquire) >= | ||
| 1000000) { | ||
| self->current_tick_second_busy_us_.store(0, std::memory_order_release); |
There was a problem hiding this comment.
LOW
·
correctness
当累计时间超过 1 秒时,超出部分会被丢弃。例如:busy_us 从 950,000 增加到 1,150,000 后触发重置,额外的 150,000 微秒将丢失,导致统计不准确。建议在重置时保留超出部分的值,或使用取模操作计算实际保留的时间。
Location: src/atframe/modules/worker_pool_module.cpp:481-482
Referenced code (src/atframe/modules/worker_pool_module.cpp:481-482):
self->current_tick_second_waited_us_.load(std::memory_order_acquire) >=
1000000) {
self->current_tick_second_busy_us_.store(0, std::memory_order_release);
self->current_tick_second_waited_us_.store(0, std::memory_order_release);
auto wait_preserve_us =Suggested fix:
在重置时保留超出 1 秒的部分,例如:auto overflow = busy_val + waited_val - 1000000; current_tick_second_busy_us_.store(overflow, ...);
| std::atomic<std::chrono::microseconds::rep> current_tick_second_busy_us_; | ||
| std::atomic<std::chrono::microseconds::rep> current_tick_second_waited_us_; |
| self->cpu_time_last_minute_busy_us_.fetch_add(busy_rep, std::memory_order_release); | ||
| } | ||
|
|
||
| // Maybe sleep until timeout or next event |
| } | ||
| } | ||
|
|
owent
left a comment
There was a problem hiding this comment.
AICR problem for src/atframe/modules/worker_pool_module.cpp:282
| worker_tick_action_container_type tick_handles_; | ||
| std::recursive_mutex tick_handle_lock_; | ||
| std::atomic<std::chrono::microseconds::rep> current_tick_interval_us_; | ||
| std::atomic<std::chrono::microseconds::rep> current_tick_second_busy_us_; |
There was a problem hiding this comment.
MEDIUM
·
correctness
新增的原子成员变量 current_tick_second_busy_us_ 和 current_tick_second_waited_us_ 未在 diff 中显示初始化。std::atomic 的默认构造函数对于整型类型会保留未初始化的值。如果 worker 类的构造函数没有显式初始化这些变量,它们将持有不确定的值,导致首次使用时的行为未定义。
建议在构造函数中添加初始化,例如:
current_tick_second_busy_us_.store(0, std::memory_order_release);
current_tick_second_waited_us_.store(0, std::memory_order_release);或使用成员初始化列表/默认成员初始化器。
Location: src/atframe/modules/worker_pool_module.cpp:282-283
Referenced code (src/atframe/modules/worker_pool_module.cpp:282-283):
std::recursive_mutex tick_handle_lock_;
std::atomic<std::chrono::microseconds::rep> current_tick_interval_us_;
std::atomic<std::chrono::microseconds::rep> current_tick_second_busy_us_;
std::atomic<std::chrono::microseconds::rep> current_tick_second_waited_us_;
std::atomic<std::chrono::microseconds::rep> cpu_time_busy_us_;
owent
left a comment
There was a problem hiding this comment.
AICR problem for src/atframe/modules/worker_pool_module.cpp:481
| } | ||
| } | ||
|
|
||
| if (self->current_tick_second_busy_us_.load(std::memory_order_acquire) + |
There was a problem hiding this comment.
LOW
·
concurrency
非原子的计数器检查与重置操作可能导致监控数据瞬时不一致。current_tick_second_busy_us_ 和 current_tick_second_waited_us_ 的读取和重置分为四个独立的原子操作,其他线程在读取这两个值用于监控时可能观察到:一个已重置而另一个未重置的中间状态,导致该瞬间的 busy + waited 总和被低估。对于监控用途这种情况通常可接受,但如果需要精确一致性,应考虑使用互斥锁或将两个计数器合并为单个 128 位原子值。
Location: src/atframe/modules/worker_pool_module.cpp:481
Referenced code (src/atframe/modules/worker_pool_module.cpp:481):
}
if (self->current_tick_second_busy_us_.load(std::memory_order_acquire) +
self->current_tick_second_waited_us_.load(std::memory_order_acquire) >=
1000000) {Suggested fix:
如果监控需要精确一致性,考虑使用 mutex 保护这两个计数器的读写,或使用单个 64 位原子值存储总时间(busy + waited)。
owent
left a comment
There was a problem hiding this comment.
AICR problem for src/atframe/modules/worker_pool_module.cpp:481
| } | ||
| } | ||
|
|
||
| if (self->current_tick_second_busy_us_.load(std::memory_order_acquire) + |
There was a problem hiding this comment.
MEDIUM
·
concurrency
对两个独立的原子变量分别执行 load 后求和判断、再分别 store 重置的操作整体上不是原子的。当前所有读写操作均在同一个工作线程内顺序执行,暂时不存在实际的竞态条件。但这些原子变量使用了 memory_order_acquire/release 语义,暗示设计上可能预期被其他线程读取(如监控场景)。如果未来添加监控线程读取这些统计值,检查与重置之间的时间窗口可能导致读取到不一致状态(一个已重置、另一个未重置)。建议考虑使用单一原子变量累计时间,或在需要跨线程读取时使用互斥锁保护复合操作。
Location: src/atframe/modules/worker_pool_module.cpp:481
Referenced code (src/atframe/modules/worker_pool_module.cpp:481):
}
if (self->current_tick_second_busy_us_.load(std::memory_order_acquire) +
self->current_tick_second_waited_us_.load(std::memory_order_acquire) >=
1000000) {Suggested fix:
考虑将 busy 和 waited 时间合并到单一原子变量中累积,或使用互斥锁保护检查和重置的复合操作。
No description provided.